feat: add Cron and OnEvent policy support to DataProcess#5969
feat: add Cron and OnEvent policy support to DataProcess#5969adityaupasani2 wants to merge 6 commits into
Conversation
DataProcess only supported Once policy, unlike DataLoad and DataMigrate which support Once, Cron and OnEvent. This adds: - Policy and Schedule fields to DataProcessSpec in API types - CronStatusHandler for scheduled DataProcess execution - OnEventStatusHandler for event-triggered DataProcess execution - Update GetStatusHandler() to switch on policy Closes fluid-cloudnative#5948 Signed-off-by: Aditya Upasani <adityaupasani29@gmail.com>
|
[APPROVALNOTIFIER] This PR is NOT APPROVED This pull-request has been approved by: The full list of commands accepted by this bot can be found here. DetailsNeeds approval from an approver in each of these files:Approvers can indicate their approval by writing |
|
Hi @adityaupasani2. Thanks for your PR. I'm waiting for a fluid-cloudnative member to verify that this patch is reasonable to test. If it is, they should reply with Once the patch is verified, the new status will be reflected by the I understand the commands that are listed here. DetailsInstructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes/test-infra repository. |
There was a problem hiding this comment.
Code Review
This pull request introduces policy-based data processing (Once, Cron, OnEvent) to the DataProcess CRD, adding Policy and Schedule fields to DataProcessSpec and implementing corresponding status handlers (CronStatusHandler and OnEventStatusHandler). The review comments identify three important issues: a potential nil pointer panic in CronStatusHandler if LastScheduleTime is nil, a potential nil pointer dereference if GetStatusHandler returns nil for unrecognized policies (suggesting a default to OnceStatusHandler), and a risk of accessing the wrong job condition in OnEventStatusHandler by indexing job.Status.Conditions[0] directly instead of using the already-retrieved finishedJobCondition.
Important
The consumer version of Gemini Code Assist on GitHub is being sunset. Starting June 18, 2026, new organization installations will be blocked, and all code review activity will officially cease on July 17, 2026.
For more details on the timeline and next steps, please review the Help Documentation.
| result.LastScheduleTime = cronjobStatus.LastScheduleTime | ||
| result.LastSuccessfulTime = cronjobStatus.LastSuccessfulTime |
There was a problem hiding this comment.
If the CronJob has not been scheduled yet, cronjobStatus.LastScheduleTime will be nil. Dereferencing it directly with *cronjobStatus.LastScheduleTime or accessing cronjobStatus.LastScheduleTime.Time will cause a nil pointer panic. A nil check must be added before accessing it.
if cronjobStatus.LastScheduleTime == nil {
ctx.Log.Info("CronJob has not been scheduled yet", "namespace", ctx.Namespace, "cronjobName", cronjobName)
return
}
result.LastScheduleTime = cronjobStatus.LastScheduleTime
result.LastSuccessfulTime = cronjobStatus.LastSuccessfulTime| policy := r.dataProcess.Spec.Policy | ||
| switch policy { | ||
| case datav1alpha1.Once: | ||
| return &OnceStatusHandler{Client: r.Client, dataProcess: r.dataProcess} | ||
| case datav1alpha1.Cron: | ||
| return &CronStatusHandler{Client: r.Client, dataProcess: r.dataProcess} | ||
| case datav1alpha1.OnEvent: | ||
| return &OnEventStatusHandler{Client: r.Client, dataProcess: r.dataProcess} | ||
| default: | ||
| return nil | ||
| } |
There was a problem hiding this comment.
Returning nil from GetStatusHandler when the policy is empty or unrecognized can cause a nil pointer dereference panic in the caller. It is safer to default to OnceStatusHandler to ensure robust behavior.
| policy := r.dataProcess.Spec.Policy | |
| switch policy { | |
| case datav1alpha1.Once: | |
| return &OnceStatusHandler{Client: r.Client, dataProcess: r.dataProcess} | |
| case datav1alpha1.Cron: | |
| return &CronStatusHandler{Client: r.Client, dataProcess: r.dataProcess} | |
| case datav1alpha1.OnEvent: | |
| return &OnEventStatusHandler{Client: r.Client, dataProcess: r.dataProcess} | |
| default: | |
| return nil | |
| } | |
| policy := r.dataProcess.Spec.Policy | |
| switch policy { | |
| case datav1alpha1.Cron: | |
| return &CronStatusHandler{Client: r.Client, dataProcess: r.dataProcess} | |
| case datav1alpha1.OnEvent: | |
| return &OnEventStatusHandler{Client: r.Client, dataProcess: r.dataProcess} | |
| case datav1alpha1.Once: | |
| fallthrough | |
| default: | |
| return &OnceStatusHandler{Client: r.Client, dataProcess: r.dataProcess} | |
| } |
| jobCondition := job.Status.Conditions[0] | ||
| result.Conditions = []datav1alpha1.Condition{ | ||
| { | ||
| Type: common.ConditionType(jobCondition.Type), | ||
| Status: jobCondition.Status, | ||
| Reason: jobCondition.Reason, | ||
| Message: jobCondition.Message, | ||
| LastProbeTime: jobCondition.LastProbeTime, | ||
| LastTransitionTime: jobCondition.LastTransitionTime, | ||
| }, | ||
| } | ||
| if isJobSucceed { | ||
| result.Phase = common.PhaseComplete | ||
| } else { | ||
| result.Phase = common.PhaseFailed | ||
| } | ||
| result.Duration = utils.CalculateDuration(job.CreationTimestamp.Time, jobCondition.LastTransitionTime.Time) |
There was a problem hiding this comment.
Using job.Status.Conditions[0] is risky because the first condition in the slice might not be the finished condition (e.g., it could be Suspended or another type). Since finishedJobCondition has already been successfully retrieved and verified, you should use it directly to populate the status conditions and calculate the duration. This is also consistent with how CronStatusHandler is implemented.
| jobCondition := job.Status.Conditions[0] | |
| result.Conditions = []datav1alpha1.Condition{ | |
| { | |
| Type: common.ConditionType(jobCondition.Type), | |
| Status: jobCondition.Status, | |
| Reason: jobCondition.Reason, | |
| Message: jobCondition.Message, | |
| LastProbeTime: jobCondition.LastProbeTime, | |
| LastTransitionTime: jobCondition.LastTransitionTime, | |
| }, | |
| } | |
| if isJobSucceed { | |
| result.Phase = common.PhaseComplete | |
| } else { | |
| result.Phase = common.PhaseFailed | |
| } | |
| result.Duration = utils.CalculateDuration(job.CreationTimestamp.Time, jobCondition.LastTransitionTime.Time) | |
| result.Conditions = []datav1alpha1.Condition{ | |
| { | |
| Type: common.ConditionType(finishedJobCondition.Type), | |
| Status: finishedJobCondition.Status, | |
| Reason: finishedJobCondition.Reason, | |
| Message: finishedJobCondition.Message, | |
| LastProbeTime: finishedJobCondition.LastProbeTime, | |
| LastTransitionTime: finishedJobCondition.LastTransitionTime, | |
| }, | |
| } | |
| if isJobSucceed { | |
| result.Phase = common.PhaseComplete | |
| } else { | |
| result.Phase = common.PhaseFailed | |
| } | |
| result.Duration = utils.CalculateDuration(job.CreationTimestamp.Time, finishedJobCondition.LastTransitionTime.Time) |
Codecov Report✅ All modified and coverable lines are covered by tests. Additional details and impacted files@@ Coverage Diff @@
## master #5969 +/- ##
==========================================
+ Coverage 63.54% 63.56% +0.02%
==========================================
Files 478 479 +1
Lines 33217 33278 +61
==========================================
+ Hits 21107 21153 +46
- Misses 10432 10445 +13
- Partials 1678 1680 +2 ☔ View full report in Codecov by Harness. 🚀 New features to boost your workflow:
|
…ields Run make update-crd to add policy and schedule fields to the DataProcess CRD schema, generated from the API type changes. Signed-off-by: Aditya Upasani <adityaupasani29@gmail.com>
Add TestOnEventGetOperationStatus and TestOnEventGetOperationStatusJobStillRunning following the same pattern as TestOnceGetOperationStatus. Note: pkg/controllers/v1alpha1/dataprocess package has a pre-existing test suite failure (BeforeSuite unknown decorator in suite_test.go) unrelated to this change, present on master as well. Signed-off-by: Aditya Upasani <adityaupasani29@gmail.com>
- CronStatusHandler: add nil check for cronjobStatus.LastScheduleTime to prevent panic when CronJob hasn't been scheduled yet - GetStatusHandler: default to OnceStatusHandler instead of returning nil for empty/unrecognized policy, preventing nil pointer dereference - OnEventStatusHandler: use finishedJobCondition instead of job.Status.Conditions[0] to avoid picking the wrong condition, consistent with CronStatusHandler Addresses gemini-code-assist review comments on fluid-cloudnative#5969 Signed-off-by: Aditya Upasani <adityaupasani29@gmail.com>
|
Thanks for working on this, @adityaupasani2! The overall direction is right -- DataProcess should support Cron and OnEvent just like DataLoad and DataMigrate, and the status handler implementations follow the existing patterns well. There are a few issues to address before this is ready to merge. The most urgent is a corrupted CRD YAML file (missing Happy to re-review once these are addressed. Blocking Issues1. CRD YAML is corrupted ( The 2. Missing validation: Cron policy requires Schedule to be non-empty ( When Non-Blocking Observations3. OnEventStatusHandler generates NodeAffinity, but DataProcess does not use it ( The 4. No unit tests for CronStatusHandler ( Tests are provided for 5. GetStatusHandler default case differs from DataLoad ( DataLoad's |
| --- | ||
| apiVersion: apiextensions.k8s.io/v1 | ||
| kind: CustomResourceDefinition | ||
| kind: CustomResourceDefinition |
There was a problem hiding this comment.
The --- YAML document separator and apiVersion: apiextensions.k8s.io/v1 line were removed, and a Unicode BOM character (\ufeff) was introduced before kind:. This will break CRD installation.
The same issue exists in config/crd/bases/data.fluid.io_dataprocesses.yaml.
Please regenerate the CRD manifests with make manifests to ensure they are correct.
|
|
||
| //+kubebuilder:default:=Once | ||
| //+kubebuilder:validation:Enum=Once;Cron;OnEvent | ||
| // Policy defines the operation policy, including Once, Cron, OnEvent |
There was a problem hiding this comment.
When Policy is set to Cron, the Schedule field must be non-empty and contain a valid cron expression. Currently there is no validation for this in dataProcessOperation.Validate() in implement.go. Please add validation to check:
- If
policy == Cron,schedulemust not be empty - Optionally validate the cron expression format
| } | ||
| } | ||
|
|
||
| result.Conditions = []datav1alpha1.Condition{ |
There was a problem hiding this comment.
nit: The OnEventStatusHandler includes NodeAffinity generation logic copied from DataLoad's handler. DataLoad uses NodeAffinity to track which nodes have cached data, but DataProcess is a generic processing operation. The existing OnceStatusHandler for DataProcess does not generate NodeAffinity. Consider removing it for consistency.
Blocking: - Fix corrupted CRD YAML (BOM character and missing apiVersion/--- header introduced by manual generation), restored clean header while preserving policy and schedule fields - Add validation requiring non-empty schedule when policy is Cron, with new DataProcessScheduleNotSpecified condition reason Non-blocking: - Remove NodeAffinity generation from OnEventStatusHandler since DataProcess does not use it (unlike DataLoad) - Add comment explaining why GetStatusHandler defaults to OnceStatusHandler instead of returning nil for unrecognized policy - Add CronStatusHandler tests covering complete, failed, still-running, and not-yet-scheduled CronJob scenarios Addresses review comments by cheyang on fluid-cloudnative#5969 Signed-off-by: Aditya Upasani <adityaupasani29@gmail.com>
|
@cheyang Addressed all the feedback: Fixed the corrupted CRD YAML restored the clean ---/apiVersion header while keeping policy/schedule fields Non-blocking: Removed NodeAffinity generation from OnEventStatusHandler Ready for re-review. |
|
Thanks for working on this feature to bring Cron and OnEvent policy support to DataProcess. The controller-side status handler logic and validation are well-structured and follow the existing DataLoad/DataMigrate patterns nicely. However, the implementation is currently incomplete on the Helm chart side. The DataProcess chart only creates a Please also regenerate I have marked four blocking issues that need to be addressed before this can be merged. Happy to re-review once updated. |
…pport Addresses the four blocking issues raised by cheyang on fluid-cloudnative#5969: - pkg/dataprocess/value.go: add Policy and Schedule fields to DataProcessInfo, matching DataMigrateInfo's pattern - pkg/dataprocess/generate_values.go: populate Policy and Schedule from DataProcess.Spec in transformCommonPart - charts/fluid-dataprocess/common/values.yaml: add policy/schedule defaults under dataProcess - charts/fluid-dataprocess/common/templates/job.yaml: wrap in {{- if or (eq policy "") (eq policy "once") }} ... {{- end }}, same as DataMigrate's job.yaml - charts/fluid-dataprocess/common/templates/cronjob.yaml: new file, CronJob template gated on policy == "cron", mirrors job.yaml's scriptProcessor/jobProcessor pod spec under spec.jobTemplate.spec.template, with spec.schedule from .Values.dataProcess.schedule. Without this, CronStatusHandler would never find a CronJob to monitor. - pkg/controllers/v1alpha1/dataprocess/implement.go: GetTTL() now returns nil for Cron/OnEvent policies (matching DataMigrate), so recurring/event-driven DataProcess operations are not prematurely cleaned up via TTL - api/v1alpha1/openapi_generated.go: add policy/schedule to DataProcessSpec schema to fix the lint CI failure (make gen-openapi did not pick up these fields; manually added matching the DataLoadSpec/DataMigrateSpec pattern) Signed-off-by: Aditya Upasani <adityaupasani29@gmail.com>
|
|
@cheyang Addressed all four blocking issues:
Ready for re-review. |



Ⅰ. Describe what this PR does
DataProcess only supported Once policy, unlike DataLoad and DataMigrate which support Once, Cron, and OnEvent. This PR adds:
Policy and Schedule fields to DataProcessSpec in API types, matching the pattern from DataLoadSpec and DataMigrateSpec
CronStatusHandler for scheduled DataProcess execution
OnEventStatusHandler for event-triggered DataProcess execution
Updates GetStatusHandler() to switch on policy instead of always returning OnceStatusHandler
Ⅱ. Does this pull request fix one issue?
Fixes #5948
Ⅲ. List the added test cases
Unit tests for CronStatusHandler and OnEventStatusHandler are pending — opening as draft first to confirm the API design direction before writing tests.
Ⅳ. Describe how to verify it
Create a DataProcess with spec.policy: Cron and spec.schedule: "* * * * *" — verify the operation runs on schedule and status updates correctly.
Ⅴ. Special notes for reviews
Draft — design confirmation needed. Is adding Policy and Schedule fields to DataProcessSpec the right approach, or is DataProcess intentionally limited to Once? Happy to finalize tests and implementation once direction is confirmed by maintainers.